+++ /dev/null
-# Copyright (C) 2004 Mike Wray <mike.wray@hp.com>
-
-## XEND_DOMAIN_CREATE = "xend.domain.create": dom
-## create:
-## xend.domain.destroy: dom, reason:died/crashed
-## xend.domain.up ?
-
-## xend.domain.unpause: dom
-## xend.domain.pause: dom
-## xend.domain.shutdown: dom
-## xend.domain.destroy: dom
-
-## xend.domain.migrate.begin: dom, to
-## Begin tells: src host, src domain uri, dst host. Dst id known?
-## err: src host, src domain uri, dst host, dst id if known, status (of domain: ok, dead,...), reason
-## end: src host, src domain uri, dst host, dst uri
-
-## Events for both ends of migrate: for exporter and importer?
-## Include migrate id so can tie together.
-## Have uri /xend/migrate/<id> for migrate info (migrations in progress).
-
-## (xend.domain.migrate.begin (src <host>) (src.domain <id>)
-## (dst <host>) (id <migrate id>))
-
-## xend.domain.migrate.end:
-## (xend.domain.migrate.end (domain <id>) (to <host>)
-
-## xend.node.up: xend uri
-## xend.node.down: xend uri
-
-## xend.error ?
-
-## format:
-
# Copyright (C) 2004 Mike Wray <mike.wray@hp.com>
import socket
+import threading
from xen.web import reactor, protocol
def __init__(self, controller, id, config, recreate=False):
Dev.__init__(self, controller, id, config)
+ self.lock = threading.RLock()
self.status = self.STATUS_NEW
self.addr = None
self.conn = None
[self.id, self.getDomain(), self.console_port])
def init(self, recreate=False, reboot=False):
- self.destroyed = False
- self.channel = self.getChannel()
- self.listen()
+ try:
+ self.lock.acquire()
+ self.destroyed = False
+ self.channel = self.getChannel()
+ self.listen()
+ finally:
+ self.lock.release()
def checkConsolePort(self, console_port):
"""Check that a console port is not in use by another console.
ctrl.checkConsolePort(console_port)
def sxpr(self):
- val = ['console',
- ['status', self.status ],
- ['id', self.id ],
- ['domain', self.getDomain() ] ]
- val.append(['local_port', self.getLocalPort() ])
- val.append(['remote_port', self.getRemotePort() ])
- val.append(['console_port', self.console_port ])
- val.append(['index', self.getIndex()])
- if self.addr:
- val.append(['connected', self.addr[0], self.addr[1]])
+ try:
+ self.lock.acquire()
+ val = ['console',
+ ['status', self.status ],
+ ['id', self.id ],
+ ['domain', self.getDomain() ] ]
+ val.append(['local_port', self.getLocalPort() ])
+ val.append(['remote_port', self.getRemotePort() ])
+ val.append(['console_port', self.console_port ])
+ val.append(['index', self.getIndex()])
+ if self.addr:
+ val.append(['connected', self.addr[0], self.addr[1]])
+ finally:
+ self.lock.release()
return val
def getLocalPort(self):
- if self.channel:
- return self.channel.getLocalPort()
- else:
- return 0
+ try:
+ self.lock.acquire()
+ if self.channel:
+ return self.channel.getLocalPort()
+ else:
+ return 0
+ finally:
+ self.lock.release()
def getRemotePort(self):
- if self.channel:
- return self.channel.getRemotePort()
- else:
- return 0
+ try:
+ self.lock.acquire()
+ if self.channel:
+ return self.channel.getRemotePort()
+ else:
+ return 0
+ finally:
+ self.lock.release()
def uri(self):
"""Get the uri to use to connect to the console.
print 'ConsoleDev>destroy>', self, reboot
if reboot:
return
- self.status = self.STATUS_CLOSED
- if self.conn:
- self.conn.loseConnection()
- self.listener.stopListening()
+ try:
+ self.lock.acquire()
+ self.status = self.STATUS_CLOSED
+ if self.conn:
+ self.conn.loseConnection()
+ self.listener.stopListening()
+ finally:
+ self.lock.release()
def listen(self):
"""Listen for TCP connections to the console port..
"""
- if self.closed():
- return
- if self.listener:
- pass
- else:
- self.status = self.STATUS_LISTENING
- cf = ConsoleFactory(self, self.id)
- interface = xroot.get_console_address()
- self.listener = reactor.listenTCP(self.console_port, cf, interface=interface)
+ try:
+ self.lock.acquire()
+ if self.closed():
+ return
+ if self.listener:
+ pass
+ else:
+ self.status = self.STATUS_LISTENING
+ cf = ConsoleFactory(self, self.id)
+ interface = xroot.get_console_address()
+ self.listener = reactor.listenTCP(self.console_port, cf, interface=interface)
+ finally:
+ self.lock.release()
def connect(self, addr, conn):
"""Connect a TCP connection to the console.
returns 0 if ok, negative otherwise
"""
- if self.closed():
- return -1
- if self.connected():
- return -1
- self.addr = addr
- self.conn = conn
- self.status = self.STATUS_CONNECTED
- self.writeOutput()
+ try:
+ self.lock.acquire()
+ if self.closed():
+ return -1
+ if self.connected():
+ return -1
+ self.addr = addr
+ self.conn = conn
+ self.status = self.STATUS_CONNECTED
+ self.writeOutput()
+ finally:
+ self.lock.release()
return 0
def disconnect(self, conn=None):
"""Disconnect the TCP connection to the console.
"""
print 'ConsoleDev>disconnect>', conn
- if conn and conn != self.conn: return
- if self.conn:
- self.conn.loseConnection()
- self.addr = None
- self.conn = None
- self.status = self.STATUS_LISTENING
- self.listen()
+ try:
+ self.lock.acquire()
+ if conn and conn != self.conn: return
+ if self.conn:
+ self.conn.loseConnection()
+ self.addr = None
+ self.conn = None
+ self.status = self.STATUS_LISTENING
+ self.listen()
+ finally:
+ self.lock.release()
def receiveOutput(self, msg):
"""Receive output console data from the console channel.
subtype minor message typ
"""
# Treat the obuf as a ring buffer.
- data = msg.get_payload()
- data_n = len(data)
- if self.obuf.space() < data_n:
- self.obuf.discard(data_n)
- if self.obuf.space() < data_n:
- data = data[-self.obuf.space():]
- self.obuf.write(data)
- self.writeOutput()
+ try:
+ self.lock.acquire()
+ data = msg.get_payload()
+ data_n = len(data)
+ if self.obuf.space() < data_n:
+ self.obuf.discard(data_n)
+ if self.obuf.space() < data_n:
+ data = data[-self.obuf.space():]
+ self.obuf.write(data)
+ self.writeOutput()
+ finally:
+ self.lock.release()
def writeOutput(self):
"""Handle buffered output from the console device.
Sends it to the connected TCP connection (if any).
"""
- if self.closed():
- return -1
- if not self.conn:
- return 0
- while not self.obuf.empty():
- try:
- bytes = self.conn.write(self.obuf.peek())
- if bytes > 0:
- self.obuf.discard(bytes)
- except socket.error:
- pass
+ try:
+ self.lock.acquire()
+ if self.closed():
+ return -1
+ if not self.conn:
+ return 0
+ while not self.obuf.empty():
+ try:
+ bytes = self.conn.write(self.obuf.peek())
+ if bytes > 0:
+ self.obuf.discard(bytes)
+ except socket.error:
+ pass
+ finally:
+ self.lock.release()
return 0
def receiveInput(self, conn, data):
conn connection
data input data
"""
- if self.closed(): return -1
- if conn != self.conn: return 0
- self.ibuf.write(data)
- self.writeInput()
+ try:
+ self.lock.acquire()
+ if self.closed(): return -1
+ if conn != self.conn: return 0
+ self.ibuf.write(data)
+ self.writeInput()
+ finally:
+ self.lock.release()
return 0
def writeInput(self):
"""Write pending console input to the console channel.
Writes as much to the channel as it can.
"""
- while self.channel and not self.ibuf.empty():
- msg = xu.message(CMSG_CONSOLE, 0, 0)
- msg.append_payload(self.ibuf.read(msg.MAX_PAYLOAD))
- self.channel.writeRequest(msg)
+ try:
+ self.lock.acquire()
+ while self.channel and not self.ibuf.empty():
+ msg = xu.message(CMSG_CONSOLE, 0, 0)
+ msg.append_payload(self.ibuf.read(msg.MAX_PAYLOAD))
+ self.channel.writeRequest(msg)
+ finally:
+ self.lock.release()
class ConsoleController(DevController):
"""Device controller for all the consoles for a domain.